Skip to content

Conversation

moonyoungCHAE
Copy link
Contributor

Fixes #4088

Description

When a message is filtered out by the Kafka interceptor while the previous message passes through the interceptor and the producer is created, a commit occurs while the Kafka transaction is not active.

structure: Interceptor → Listener (a transactional listener that consumes a message and produces another message)

@moonyoungCHAE moonyoungCHAE force-pushed the issue-4088 branch 9 times, most recently from 876763a to 6fa0e01 Compare October 5, 2025 06:32
}

private void sendOffsetsToTransaction() {
if (this.kafkaTxManager != null && TransactionSynchronizationManager.getResource(this.kafkaTxManager.getProducerFactory()) == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the kafkaTxManager is null? Don't we need to address that also? If it is null, we can return as there is no point in sending the offsets to transaction. Do I miss anything on that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sobychacko You're right. I fixed it. Thanks for the feedback.

.

Signed-off-by: moonyougnCHAE <xpf_fl@naver.com>
.

Signed-off-by: moonyougnCHAE <xpf_fl@naver.com>
.

Signed-off-by: moonyougnCHAE <xpf_fl@naver.com>
template.send(new ProducerRecord<>(topic11, 0, 0, "bar2"));
return null;
});
assertThat(successLatch.get().await(30, TimeUnit.SECONDS)).isTrue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this test ensuring that offsets are sent to only active transactions? I mean, i see that you are asserting that two records are consumed transactionally, but how do we ensure the case where there is no active txn? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sobychacko Thanks for the feedback. I modified the test to verify whether the commit occurred within a transaction by using TransactionExecutionListener.

.

Signed-off-by: moonyougnCHAE <xpf_fl@naver.com>
@sobychacko sobychacko merged commit 81f2941 into spring-projects:main Oct 14, 2025
3 checks passed
spring-builds pushed a commit that referenced this pull request Oct 14, 2025
Fixes #4088

Fix `IllegalStateException` when record filtered by interceptor in transactional listener

When using a transactional KafkaMessageListenerContainer with an early `RecordInterceptor`
that filters out records, the container attempted to send offsets to a transaction even when no
active transaction existed. This resulted in an `IllegalStateException` from the Kafka producer:
"Cannot send offsets if a transaction is not in progress (currentState=READY)"

The issue occurred because:
  - Container has a `KafkaTransactionManager` configured
  - Interceptor filters a record before listener execution
  - No transaction is started (listener never executes)
  - Container still attempts to send offsets via `sendOffsetsToTransaction()`
  - Kafka producer rejects the call as no transaction is active

  Solution:

  Add a guard clause in `sendOffsetsToTransaction()` to check for an active transaction resource in    `TransactionSynchronizationManager`. If either `kafkaTxManager` is null or there's no active transaction resource,
the method returns early without attempting to send offsets.

  The fix includes an integration test in `TransactionalContainerTests` that:
  - Uses `RecordInterceptor` to filter records
  - Tracks transaction commits via `TransactionExecutionListener`
  - Verifies only records that execute listeners trigger transactions
  - Confirms filtered records don't cause exceptions

Signed-off-by: moonyougnCHAE <xpf_fl@naver.com>
(cherry picked from commit 81f2941)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

KafkaMessageListenerContainer tries to send offsets to tx when record is filtered out by early record interceptor

2 participants